-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Managed Iceberg streaming writes #32451
Conversation
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
Outdated
Show resolved
Hide resolved
...ceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...ceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
...ceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
Show resolved
Hide resolved
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java
Show resolved
Hide resolved
@@ -307,4 +314,38 @@ public void testWritePartitionedData() { | |||
assertThat( | |||
returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); | |||
} | |||
|
|||
@Test | |||
public void testStreamingWrite() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest a couple more tests where the user has set up their PCollection differently, like if it started out with accumulating mode, or if they set a weird trigger in the middle of their pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test with fixed windows and accumulating mode. Let me know if there's anything particular we should test for
Thanks @kennknowles, this is ready for another look |
* iceberg streaming writes * cleanup * adress comments
Apply some windowing and add a new
triggering_frequency_seconds
parameter to support streaming writes to Iceberg tables.The triggering frequency controls how often we commit data and create new snapshots